-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Add cluster ID to the Python layer #37583
Conversation
src/ray/gcs/gcs_client/gcs_client.h
Outdated
inline grpc::ClientContext &&PrepareContext(int64_t timeout_ms) { | ||
grpc::ClientContext context; | ||
if (timeout_ms != -1) { | ||
context.set_deadline(std::chrono::system_clock::now() + | ||
std::chrono::milliseconds(timeout_ms)); | ||
} | ||
if (!cluster_id_.IsNil()) { | ||
context.AddMetadata(kClusterIdKey, cluster_id_.Hex()); | ||
} | ||
return std::move(context); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inline grpc::ClientContext &&PrepareContext(int64_t timeout_ms) { | |
grpc::ClientContext context; | |
if (timeout_ms != -1) { | |
context.set_deadline(std::chrono::system_clock::now() + | |
std::chrono::milliseconds(timeout_ms)); | |
} | |
if (!cluster_id_.IsNil()) { | |
context.AddMetadata(kClusterIdKey, cluster_id_.Hex()); | |
} | |
return std::move(context); | |
} | |
grpc::ClientContext PrepareContext(int64_t timeout_ms) { | |
grpc::ClientContext context; | |
if (timeout_ms != -1) { | |
context.set_deadline(std::chrono::system_clock::now() + | |
std::chrono::milliseconds(timeout_ms)); | |
} | |
if (!cluster_id_.IsNil()) { | |
context.AddMetadata(kClusterIdKey, cluster_id_.Hex()); | |
} | |
return context; | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compiler complains because there's no copy constructor for ClientContext. Moved the ClientContext out of the helper the way it was before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm, interesting... because you actually can return a unique_ptr directly.
src/ray/gcs/gcs_client/gcs_client.cc
Outdated
@@ -432,8 +416,7 @@ Status PythonGcsClient::GetAllJobInfo(int64_t timeout_ms, | |||
Status PythonGcsClient::RequestClusterResourceConstraint( | |||
int64_t timeout_ms, | |||
const std::vector<std::unordered_map<std::string, double>> &bundles) { | |||
grpc::ClientContext context; | |||
GrpcClientContextWithTimeoutMs(context, timeout_ms); | |||
grpc::ClientContext &&context = PrepareContext(timeout_ms); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
grpc::ClientContext &&context = PrepareContext(timeout_ms); | |
grpc::ClientContext context = PrepareContext(timeout_ms); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall it looks good.
Could you run the failed release test to make sure it's working? |
python/ray/_private/node.py
Outdated
@@ -83,6 +83,9 @@ def __init__( | |||
) | |||
self.all_processes: dict = {} | |||
self.removal_lock = threading.Lock() | |||
self.cluster_id = ( | |||
ray_params.cluster_id if hasattr(ray_params, "cluster_id") else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why hasattr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally it was because there was no enforcement that the attr was even declared, and when it's not declared, it throws exception instead of returning None (i.e. if ray_params.cluster_id = cluster_id
is only called sometimes).
But RayParams is fixed now per your other comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good comments
src/ray/gcs/gcs_client/gcs_client.h
Outdated
inline grpc::ClientContext &&PrepareContext(int64_t timeout_ms) { | ||
grpc::ClientContext context; | ||
if (timeout_ms != -1) { | ||
context.set_deadline(std::chrono::system_clock::now() + | ||
std::chrono::milliseconds(timeout_ms)); | ||
} | ||
if (!cluster_id_.IsNil()) { | ||
context.AddMetadata(kClusterIdKey, cluster_id_.Hex()); | ||
} | ||
return std::move(context); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compiler complains because there's no copy constructor for ClientContext. Moved the ClientContext out of the helper the way it was before.
python/ray/_private/node.py
Outdated
@@ -83,6 +83,9 @@ def __init__( | |||
) | |||
self.all_processes: dict = {} | |||
self.removal_lock = threading.Lock() | |||
self.cluster_id = ( | |||
ray_params.cluster_id if hasattr(ray_params, "cluster_id") else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally it was because there was no enforcement that the attr was even declared, and when it's not declared, it throws exception instead of returning None (i.e. if ray_params.cluster_id = cluster_id
is only called sometimes).
But RayParams is fixed now per your other comment.
src/ray/gcs/gcs_client/gcs_client.h
Outdated
@@ -231,7 +232,20 @@ class RAY_EXPORT PythonGcsClient { | |||
const std::vector<std::unordered_map<std::string, double>> &bundles); | |||
Status GetClusterStatus(int64_t timeout_ms, std::string &serialized_reply); | |||
|
|||
ClusterID GetClusterId() const { return cluster_id_; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: const ClusterID&
maybe? not big deal btw.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Let's make sure the broken release test passes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
src/ray/gcs/gcs_client/gcs_client.cc
Outdated
|
||
auto status = | ||
GrpcStatusToRayStatus(node_info_stub_->GetClusterId(&context, request, &reply)); | ||
while (!status.IsTimedOut()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we fail after several retries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like all the other methods use timeouts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added retries to emulate the @_auto_reconnect on the python side
e50f292
to
f7028b9
Compare
68b3cd5
to
9b6f671
Compare
9be9cf3
to
40c36cf
Compare
assert "(raylet)" not in out_str | ||
assert "(raylet)" not in err_str | ||
|
||
def check_output(blob): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite sure, but the test is to make sure no logs are streamed to worker from raylet. SHould we keep that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we added new logs in this change that aren't errors, so we broke that invariant. Probably doesn't make sense to test for it anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok fixed
f"Please check {os.path.join(self._logs_dir, 'gcs_server.out')}" | ||
" for details" | ||
) | ||
if hasattr(self, "_logs_dir"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need to check the attr now? Could you add comment there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this was always a bug, but the path never got triggered. In the init path _logs_dir
is inited after GcsClient
python/ray/_private/worker.py
Outdated
@@ -1126,6 +1126,7 @@ def init( | |||
namespace: Optional[str] = None, | |||
runtime_env: Optional[Union[Dict[str, Any], "RuntimeEnv"]] = None, # noqa: F821 | |||
storage: Optional[str] = None, | |||
no_gcs: bool = False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't change the API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we have a global variable then? initialization order might be tricky
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the problem is that some tests don't start GCS, and then the newly added RPC hangs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't change the API. Still in progress of reviewing.
Signed-off-by: vitsai <victoria@anyscale.com>
Signed-off-by: vitsai <victoria@anyscale.com>
Signed-off-by: vitsai <victoria@anyscale.com>
6702492
to
af111c7
Compare
Signed-off-by: vitsai <vitsai@cs.stanford.edu>
Signed-off-by: vitsai <vitsai@cs.stanford.edu>
This reverts commit cfe608c. Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
An earlier[ change](ray-project#37399) added a new RPC to each GCS client, causing some stress tests to fail. Instead of adding O(num_worker) RPCs, add O(num_node) RPCs by plumbing through the Python layer. Signed-off-by: Shreyas Krishnaswamy <shrekris@anyscale.com>
ray-project#38320) This reverts commit cfe608c. Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: Shreyas Krishnaswamy <shrekris@anyscale.com>
Redo #37583 Signed-off-by: vitsai <victoria@anyscale.com> Signed-off-by: vitsai <vitsai@cs.stanford.edu>
An earlier[ change](ray-project#37399) added a new RPC to each GCS client, causing some stress tests to fail. Instead of adding O(num_worker) RPCs, add O(num_node) RPCs by plumbing through the Python layer. Signed-off-by: NripeshN <nn2012@hw.ac.uk>
ray-project#38320) This reverts commit cfe608c. Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: NripeshN <nn2012@hw.ac.uk>
Redo ray-project#37583 Signed-off-by: vitsai <victoria@anyscale.com> Signed-off-by: vitsai <vitsai@cs.stanford.edu> Signed-off-by: NripeshN <nn2012@hw.ac.uk>
An earlier[ change](ray-project#37399) added a new RPC to each GCS client, causing some stress tests to fail. Instead of adding O(num_worker) RPCs, add O(num_node) RPCs by plumbing through the Python layer. Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
ray-project#38320) This reverts commit cfe608c. Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
Redo ray-project#37583 Signed-off-by: vitsai <victoria@anyscale.com> Signed-off-by: vitsai <vitsai@cs.stanford.edu> Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
An earlier[ change](ray-project#37399) added a new RPC to each GCS client, causing some stress tests to fail. Instead of adding O(num_worker) RPCs, add O(num_node) RPCs by plumbing through the Python layer. Signed-off-by: Victor <vctr.y.m@example.com>
ray-project#38320) This reverts commit cfe608c. Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: Victor <vctr.y.m@example.com>
Redo ray-project#37583 Signed-off-by: vitsai <victoria@anyscale.com> Signed-off-by: vitsai <vitsai@cs.stanford.edu> Signed-off-by: Victor <vctr.y.m@example.com>
Why are these changes needed?
An earlier change added a new RPC to each GCS client, causing some stress tests to fail. Instead of adding O(num_worker) RPCs, add O(num_node) RPCs by plumbing through the Python layer.
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.